{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "6aec3f0b-6505-4e18-bb37-fab4a0397d1e",
   "metadata": {},
   "source": [
    "# Fraud Detection using Isolation Forest\n",
    "### Introduction\n",
    "We use the dataset of [ULB Credit Card Dataset](https://www.kaggle.com/datasets/mlg-ulb/creditcardfraud) to train our frauld detection model. In this notebook, we will use [Isolation Forest](https://mmlspark.blob.core.windows.net/docs/0.9.1/pyspark/synapse.ml.isolationforest.html) algorithm, which refers to some execellent work listed as below:\n",
    "\n",
    "* **Fraud detection handbook**: https://fraud-detection-handbook.github.io/fraud-detection-handbook/Foreword.html\n",
    "* **AWS creditcard fraud detector**: https://github.com/awslabs/fraud-detection-using-machine-learning/blob/master/source/notebooks/sagemaker_fraud_detection.ipynb\n",
    "* **Anomaly Detection using different methods**: https://www.kaggle.com/code/adepvenugopal/anomaly-detection-using-different-methods\n",
    "\n",
    "In a fraud detection scenario, we may have very few labeled examples, and it's possible that labeling fraud takes a very long time. Isolation Forest, as an unsupervised learning algorithm, is very scalable and can help us to identify the fraud data only based on features if there is little labled data."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "82357282-0b14-4823-8685-f8199dba49e0",
   "metadata": {},
   "outputs": [],
   "source": [
    "import pyspark\n",
    "import yaml\n",
    "import numpy as np\n",
    "import pandas as pd\n",
    "import warnings\n",
    "\n",
    "from pyspark.sql import functions as F\n",
    "from pyspark.sql.types import FloatType, DoubleType\n",
    "from pyspark.ml.feature import VectorAssembler\n",
    "from pyspark.ml.evaluation import BinaryClassificationEvaluator\n",
    "from pyspark.mllib.evaluation import MulticlassMetrics\n",
    "\n",
    "\n",
    "warnings.filterwarnings('ignore')\n",
    "pd.set_option('display.max_rows', None)\n",
    "pd.set_option('display.max_columns', None)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "17373c1c-92eb-49c2-b1b1-2c25800a7892",
   "metadata": {},
   "outputs": [],
   "source": [
    "def init_spark():\n",
    "    spark = pyspark.sql.SparkSession.builder\\\n",
    "            .appName(\"Fraud Detection-LightGBM\") \\\n",
    "            .config(\"spark.executor.memory\",\"8G\") \\\n",
    "            .config(\"spark.executor.instances\",\"4\") \\\n",
    "            .config(\"spark.executor.cores\", \"4\") \\\n",
    "            .config(\"spark.jars.packages\", \"com.microsoft.azure:synapseml_2.12:0.9.4\") \\\n",
    "            .config(\"spark.jars.repositories\", \"https://mmlspark.azureedge.net/maven\") \\\n",
    "            .getOrCreate()\n",
    "    sc = spark.sparkContext\n",
    "    print(sc.version)\n",
    "    print(sc.applicationId)\n",
    "    print(sc.uiWebUrl)\n",
    "    return spark\n",
    "\n",
    "def load_config(path):\n",
    "    params = dict()\n",
    "    with open(path, 'r') as stream:\n",
    "        params = yaml.load(stream, Loader=yaml.FullLoader)\n",
    "    return params\n",
    "\n",
    "def read_dataset(spark, data_path):\n",
    "    dataset = spark.read.format(\"csv\")\\\n",
    "      .option(\"header\",  True)\\\n",
    "      .option(\"inferSchema\",  True)\\\n",
    "      .load(data_path)  \n",
    "    return dataset\n",
    "\n",
    "def get_vectorassembler(dataset, features='features', label='label'):\n",
    "    featurizer = VectorAssembler(\n",
    "        inputCols = feature_cols,\n",
    "        outputCol = 'features',\n",
    "        handleInvalid = 'skip'\n",
    "    )\n",
    "    dataset = featurizer.transform(dataset)[label, features]\n",
    "    return dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "2acc7fa5-dad2-4449-9768-b328f7178915",
   "metadata": {},
   "outputs": [],
   "source": [
    "spark = init_spark()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "5f2d61ca-d3c6-42c3-a179-936da0623289",
   "metadata": {},
   "source": [
    "### Train detection model using Isolation Forest\n",
    "\n",
    "Here we are using [Isolation Forest](https://mmlspark.blob.core.windows.net/docs/0.9.1/pyspark/synapse.ml.isolationforest.html) to train our fraud detection model. Moreover, we will test the model performance by using multiple metrics, such as AUC, KS, Balanced accuracy, Cohen's kappa and Confusion Matrix. \n",
    "\n",
    "Moreover, You should replace `{MY_S3_BUCKET}`, `{TRAIN_S3_PATH}` and `{TEST_S3_PATH}` with actual values before executing code cells containing these placeholders."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a100fa29-7072-4a8d-a03e-a1646176484f",
   "metadata": {},
   "outputs": [],
   "source": [
    "train_file_path = 's3://{MY_S3_BUCKET}/{TRAIN_S3_PATH}'\n",
    "test_file_path = 's3://{MY_S3_BUCKET}/{TEST_S3_PATH}'\n",
    "fg_train_dataset = read_dataset(spark, train_file_path)\n",
    "fg_test_dataset = read_dataset(spark, test_file_path)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "612f77a9-8943-4c81-b1e2-e1bbd5a3e325",
   "metadata": {},
   "outputs": [],
   "source": [
    "fg_train_dataset.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b256cb36-c929-4c36-964b-5b4816102a17",
   "metadata": {},
   "outputs": [],
   "source": [
    "feature_cols = fg_train_dataset.columns[:-1]\n",
    "feature_cols"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "f206483a-aa0c-412e-9d0f-6bb1f010f572",
   "metadata": {},
   "outputs": [],
   "source": [
    "label_col = fg_train_dataset.columns[-1]\n",
    "label_col"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "aaec440f-6170-4e39-b3f7-eeacfc32616c",
   "metadata": {},
   "outputs": [],
   "source": [
    "train_data = get_vectorassembler(fg_train_dataset, label=label_col, features='features')\n",
    "test_data = get_vectorassembler(fg_test_dataset, label=label_col, features='features')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "671bf0d0-f0f4-49fa-aafd-f68407353bcb",
   "metadata": {},
   "outputs": [],
   "source": [
    "train_data.limit(10).toPandas()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "32e5e8ed-7579-481a-bdf4-6ff8182772fe",
   "metadata": {},
   "outputs": [],
   "source": [
    "train, valid = train_data.randomSplit([0.90, 0.10], seed=2022)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "93c0b0ea-b1c0-420f-b896-5b0d76e44003",
   "metadata": {},
   "outputs": [],
   "source": [
    "model_params = {\n",
    "    'numEstimators': 100,\n",
    "    'bootstrap': False,\n",
    "    'maxSamples': 256,\n",
    "    'maxFeatures': 1.0,\n",
    "    'contamination': 0.02,\n",
    "    'contaminationError': 0.02 * 0.01,\n",
    "    'randomSeed': 2022\n",
    "}\n",
    "\n",
    "def train_isolationforest(train_dataset, feature_col, label_col, model_params):\n",
    "    from synapse.ml.isolationforest import IsolationForest\n",
    "    model = IsolationForest(featuresCol='features', predictionCol='predictedLabel', scoreCol='rawPrediction', **model_params)\n",
    "    model = model.fit(train_dataset)\n",
    "    return model\n",
    "\n",
    "def evaluate(predictions, label_col, metricName=\"areaUnderROC\"):\n",
    "    evaluator = BinaryClassificationEvaluator(labelCol=label_col, metricName=\"areaUnderROC\")\n",
    "    return evaluator.evaluate(predictions)\n",
    "\n",
    "model = train_isolationforest(train, 'features', 'Class', model_params)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "cf614b65-9230-41b8-b610-2d98cffbc7ec",
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\"train dataset prediciton:\")\n",
    "predictions = model.transform(train_data)\n",
    "print(\"train dataset auc:\", evaluate(predictions, label_col))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "9b07945d-bf55-4e7a-bce9-820f7ccae53f",
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\"validation dataset prediciton:\")\n",
    "predictions = model.transform(valid)\n",
    "print(\"validation dataset auc:\", evaluate(predictions, label_col))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1af3c88d-318b-4a11-9aa6-5670a157db15",
   "metadata": {},
   "outputs": [],
   "source": [
    "model = train_isolationforest(train_data, 'features', 'Class', model_params)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b4635383-fae7-43a3-a182-a52be8cfacd3",
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\"test dataset prediciton:\")\n",
    "predictions = model.transform(test_data)\n",
    "print(\"test dataset auc:\", evaluate(predictions, label_col))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "52f246f2-785e-4794-9175-8548ec9abce4",
   "metadata": {},
   "outputs": [],
   "source": [
    "predictionAndLabels = predictions.select('prediction', F.col(label_col).cast(DoubleType()))\\\n",
    "                                 .withColumnRenamed(label_col, 'label')\n",
    "metrics = MulticlassMetrics(predictionAndLabels.rdd)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "02a2be7b-9f52-4152-939c-baaa03b51b13",
   "metadata": {},
   "outputs": [],
   "source": [
    "import matplotlib.pyplot as plt\n",
    "import seaborn as sn\n",
    "plt.figure(figsize = (10,7))\n",
    "sn.heatmap(metrics.confusionMatrix().toArray(), \n",
    "           xticklabels=['Not Fraud', 'Fraud'],\n",
    "           yticklabels=['Not Fraud', 'Fraud'],\n",
    "           linewidths=5, fmt='g', annot=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "53782ee2-f726-48d2-b95d-533369dea4c9",
   "metadata": {},
   "outputs": [],
   "source": [
    "import sys\n",
    "import matplotlib.pyplot as plt\n",
    "\n",
    "sys.path.append('../../') \n",
    "from common.ks_utils import ks_2samp, ks_curve\n",
    "\n",
    "label = np.array(predictions.select(label_col).collect()).reshape(-1).astype(np.float32)\n",
    "prediction = np.array(predictions.select('rawPrediction').collect())[:, 0].reshape(-1)\n",
    "print('label: ', label[0:10])\n",
    "print('prediction: ', prediction[0:10])\n",
    "\n",
    "ks = ks_2samp(label, prediction)\n",
    "print(\"KS statistic: \", ks.statistic)\n",
    "ks_curve(label, prediction)\n",
    "plt.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "f86249ee-f08b-454d-aad4-25064072b204",
   "metadata": {},
   "outputs": [],
   "source": [
    "from sklearn.metrics import balanced_accuracy_score, cohen_kappa_score\n",
    "\n",
    "# scikit-learn expects 0/1 predictions, so we threshold our raw predictions\n",
    "y_preds = np.where(prediction > 0.5, 1, 0)\n",
    "print(\"Balanced accuracy = {}\".format(balanced_accuracy_score(label, y_preds)))\n",
    "print(\"Cohen's Kappa = {}\".format(cohen_kappa_score(label, y_preds)))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "dfc3f7ca-8d78-4d8f-bd04-a3cbc841b4a7",
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.stop()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "8c7158eb-9ffb-4a66-8ff9-c1f56277224e",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.10"
  },
  "vscode": {
   "interpreter": {
    "hash": "31f2aee4e71d21fbe5cf8b01ff0e069b9275f58929596ceb00d14d90e3e16cd6"
   }
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
